package com.it.table;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author code1997
 */
public class TimeAndWindowDemo {

    public static void main(String[] args) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(executionEnvironment);
        String clickDDL = "CREATE TABLE click_source (" +
                "`user` STRING, " +
                "`url` STRING, " +
                "`ts` BIGINT, " +
                "`event_ts` AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)), " +
                "WATERMARK FOR event_ts AS event_ts - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem' ," +
                " 'path' = 'data/chapter01/click.txt' ," +
                " 'format' = 'csv'" +
                ")";
        streamTableEnvironment.executeSql(clickDDL);


    }
}
